In [2]:
%run startup.py

In [3]:
%%javascript
$.getScript('./assets/js/ipython_notebook_toc.js')


A Decision Tree of Observable Operators

Part 6: Looking at Entire Streams

source: http://reactivex.io/documentation/operators.html#tree.
(transcribed to RxPY 1.5.7, Py2.7 / 2016-12, Gunther Klessinger, axiros)

This tree can help you find the ReactiveX Observable operator you’re looking for.
See Part 1 for Usage and Output Instructions.

We also require acquaintance with the marble diagrams feature of RxPy.

Table of Contents

I want to evaluate the entire sequence of items emitted by an Observable

... and emit a single boolean indicating if all of the items pass some test all


In [7]:
rst(O.all)
for i in 9, 10:
    d = subs(O.range(10, 20).all(lambda v: v > i))



========== all ==========

module rx.linq.observable.all
@extensionmethod(Observable, alias="every")
def all(self, predicate):
    Determines whether all elements of an observable sequence satisfy a
    condition.

    1 - res = source.all(lambda value: value.length > 3)

    Keyword arguments:
    :param bool predicate: A function to test each element for a condition.

    :returns: An observable sequence containing a single element determining
    whether all elements in the source sequence pass the test in the
    specified predicate.
--------------------------------------------------------------------------------

   1.5     M New subscription on stream 276540965
   3.9     M [next]    2.4: True
   4.2     M [cmpl]    2.6: fin

   4.7     M New subscription on stream 276540989
   5.1     M [next]    0.3: False
   5.3     M [cmpl]    0.5: fin

... and emit a single boolean indicating if the Observable emitted any item (that passes some test) contains, find_index, some


In [19]:
rst(O.contains)
d = subs(O.range(10, 20).contains(11))
header("equality operation")
d = subs(O.range(10, 20).contains(11, comparer=lambda x, y: y == x + 1))



========== contains ==========

module rx.linq.observable.contains
@extensionmethod(ObservableBase)
def contains(self, value, comparer=None):
    Determines whether an observable sequence contains a specified
    element with an optional equality comparer.

    Example
    1 - res = source.contains(42)
    2 - res = source.contains({ "value": 42 }, lambda x, y: x["value"] == y["value")

    Keyword parameters:
    value -- The value to locate in the source sequence.
    comparer -- {Function} [Optional] An equality comparer to compare elements.

    Returns an observable {Observable} sequence containing a single element
    determining whether the source sequence contains an element that has
    the specified value.
--------------------------------------------------------------------------------

   1.2     M New subscription on stream 276590957
   2.0     M [next]    0.6: True
   2.2     M [cmpl]    0.7: fin


========== equality operation ==========


   3.4     M New subscription on stream 276590973
   3.8     M [next]    0.3: True
   3.9     M [cmpl]    0.4: fin

In [42]:
def comparer(*a):
    ''' find_index: you get value, index and the observable itself as argument
        some: you get only the value
    '''
    log('comparer args:', *a)
    l.append(1)
    if len(l) > 3:
        # => index will be 0
        return True
    
stream = O.from_((4, 1, 2, 1, 3))
for name in 'find_index', 'some':    
    l = []
    operator = getattr(stream, name)
    rst(operator) # output documentation, reset timer
    d = subs(operator(lambda *a: comparer(*a)))



========== find_index ==========

module rx.linq.observable.findindex
@extensionmethod(ObservableBase)
def find_index(self, predicate):
    Searches for an element that matches the conditions defined by the
    specified predicate, and returns an Observable sequence with the
    zero-based index of the first occurrence within the entire Observable
    sequence.

    Keyword Arguments:
    predicate -- {Function} The predicate that defines the conditions of the
        element to search for.

    Returns an observable {Observable} sequence with the zero-based index of
    the first occurrence of an element that matches the conditions defined
    by match, if found; otherwise, -1.
--------------------------------------------------------------------------------

   1.6     M New subscription on stream 276591037
   2.0     M comparer args: 4 [0] <rx.core.Observable.Observable object at 0x107bae610>
   2.3     M comparer args: 1 [1] <rx.core.Observable.Observable object at 0x107bae610>
   2.5     M comparer args: 2 [2] <rx.core.Observable.Observable object at 0x107bae610>
   2.8     M comparer args: 1 [3] <rx.core.Observable.Observable object at 0x107bae610>
   3.0     M [next]    1.4: 3
   3.1     M [cmpl]    1.5: fin


========== some ==========

module rx.linq.observable.some
@extensionmethod(ObservableBase)
def some(self, predicate=None):
    Determines whether some element of an observable sequence satisfies a
    condition if present, else if some items are in the sequence.

    Example:
    result = source.some()
    result = source.some(lambda x: x > 3)

    Keyword arguments:
    predicate -- A function to test each element for a condition.

    Returns {Observable} an observable sequence containing a single element
    determining whether some elements in the source sequence pass the test
    in the specified predicate if given, else if some items are in the
    sequence.
--------------------------------------------------------------------------------

   1.6     M New subscription on stream 276563317
   2.0     M comparer args: 4
   2.3     M comparer args: 1
   2.7     M comparer args: 2
   2.9     M comparer args: 1
   3.0     M [next]    1.3: True
   3.2     M [cmpl]    1.5: fin

... and emit a single boolean indicating if the Observable emitted no items is_empty


In [45]:
rst(O.is_empty)
d = subs(O.from_([]).is_empty())



========== is_empty ==========

module rx.linq.observable.isempty
@extensionmethod(ObservableBase)
def is_empty(self):
    Determines whether an observable sequence is empty.

    Returns an observable {Observable} sequence containing a single element
    determining whether the source sequence is empty.
--------------------------------------------------------------------------------

   1.9     M New subscription on stream 276596853
   2.4     M [next]    0.4: True
   2.7     M [cmpl]    0.7: fin

... and emit a single boolean indicating if the sequence is identical to one emitted by a second Observable sequence_equal


In [75]:
rst(O.sequence_equal)
def f(x, y):
    log('got', x, y)
    return str(y) == str(x).upper()
d = subs(O.from_('rxpy rocks').sequence_equal(
         O.from_('RXPY ROCKS'), lambda x, y: f(x, y)))

header("there is no order guarantee of arguments in the comparer:")
d = subs(marble_stream('a-b------c-d|').sequence_equal(
         marble_stream('A-B----C-D|'), lambda x, y: f(x, y)))



========== sequence_equal ==========

module rx.linq.observable.sequenceequal
@extensionmethod(ObservableBase)
def sequence_equal(self, second, comparer=None):
    Determines whether two sequences are equal by comparing the
    elements pairwise using a specified equality comparer.

    1 - res = source.sequence_equal([1,2,3])
    2 - res = source.sequence_equal([{ "value": 42 }], lambda x, y: x.value == y.value)
    3 - res = source.sequence_equal(rx.return_value(42))
    4 - res = source.sequence_equal(rx.return_value({ "value": 42 }), lambda x, y: x.value == y.value)

    second -- Second observable sequence or array to compare.
    comparer -- [Optional] Comparer used to compare elements of both sequences.

    Returns an observable sequence that contains a single element which
    indicates whether both sequences are of equal length and their
    corresponding elements are equal according to the specified equality
    comparer.
--------------------------------------------------------------------------------

   4.8     M New subscription on stream 279265221
   5.3     M got r R
   5.6     M got x X
   6.4     M got p P
   6.8     M got y Y
   7.0     M got    
   7.4     M got r R
   7.6     M got o O
   8.0     M got c C
   8.5     M got k K
   8.9     M got s S
   9.4     M [next]    4.6: True
   9.6     M [cmpl]    4.8: fin


========== there is no order guarantee of arguments in the comparer: ==========


  11.4     M New subscription on stream 276578009
  27.6     M got a A
 139.1     M got b B
 749.0  T988 got C c
 749.6  T988 [next]  738.1: False
 749.7  T988 [cmpl]  738.2: fin

... and emit the average of all of their values average


In [83]:
rst(O.average)
d = subs(O.from_('1199').average(lambda x: int(x)))



========== average ==========

module rx.linq.observable.average
@extensionmethod(ObservableBase)
def average(self, key_mapper=None):
    Computes the average of an observable sequence of values that are in
    the sequence or obtained by invoking a transform function on each
    element of the input sequence if present.

    Example
    res = source.average();
    res = source.average(lambda x: x.value)

    :param Observable self: Observable to average.
    :param types.FunctionType key_mapper: A transform function to apply to
        each element.

    :returns: An observable sequence containing a single element with the
        average of the sequence of values.
    :rtype: Observable
--------------------------------------------------------------------------------

   2.5     M New subscription on stream 279255829
   4.0     M [next]    1.4: 5.0
   4.1     M [cmpl]    1.5: fin

... and emit the sum of all of their values sum


In [84]:
rst(O.sum)
d = subs(O.from_('1199').sum(lambda x: int(x)))



========== sum ==========

module rx.linq.observable.sum
@extensionmethod(ObservableBase)
def sum(self, key_mapper=None):
    Computes the sum of a sequence of values that are obtained by
    invoking an optional transform function on each element of the input
    sequence, else if not specified computes the sum on each item in the
    sequence.

    Example
    res = source.sum()
    res = source.sum(lambda x: x.value)

    key_mapper -- {Function} [Optional] A transform function to apply to
        each element.

    Returns an observable {Observable} sequence containing a single element
    with the sum of the values in the source sequence.
--------------------------------------------------------------------------------

   2.6     M New subscription on stream 276565865
   3.9     M [next]    1.2: 20
   4.1     M [cmpl]    1.4: fin

... and emit a number indicating how many items were in the sequence count


In [88]:
rst(O.count)
d = subs(O.from_('1199').count(lambda x: int(x) > 2))



========== count ==========

module rx.linq.observable.count
@extensionmethod(ObservableBase)
def count(self, predicate=None):
    Returns an observable sequence containing a value that represents
    how many elements in the specified observable sequence satisfy a
    condition if provided, else the count of items.

    1 - res = source.count()
    2 - res = source.count(lambda x: x > 3)

    Keyword arguments:
    :param types.FunctionType predicate: A function to test each element for a
        condition.

    :returns: An observable sequence containing a single element with a
    number that represents how many elements in the input sequence satisfy
    the condition in the predicate function if provided, else the count of
    items in the sequence.
    :rtype: Observable
--------------------------------------------------------------------------------

   1.8     M New subscription on stream 279255949
   3.6     M [next]    1.6: 2
   3.7     M [cmpl]    1.8: fin

...and emit the item with the maximum value max, max_by


In [91]:
rst(O.max)
d = subs(O.from_('1199').max(lambda x, y: int(x) + int(y) < 5))



========== max ==========

module rx.linq.observable.max
@extensionmethod(ObservableBase)
def max(self, comparer=None):
    Returns the maximum value in an observable sequence according to the
    specified comparer.

    Example
    res = source.max()
    res = source.max(lambda x, y:  x.value - y.value)

    Keyword arguments:
    comparer -- {Function} [Optional] Comparer used to compare elements.

    Returns {Observable} An observable sequence containing a single element
    with the maximum element in the source sequence.
--------------------------------------------------------------------------------

   1.5     M New subscription on stream 279255829
   3.0     M [next]    1.3: 1
   3.1     M [cmpl]    1.5: fin

In [97]:
rst(O.max_by)
d = subs(O.from_('1271246').max_by(lambda x: int(x)  < 5))



========== max_by ==========

module rx.linq.observable.maxby
@extensionmethod(ObservableBase)
def max_by(self, key_mapper, comparer=None):
    Returns the elements in an observable sequence with the maximum
    key value according to the specified comparer.

    Example
    res = source.max_by(lambda x: x.value)
    res = source.max_by(lambda x: x.value, lambda x, y: x - y)

    Keyword arguments:
    key_mapper -- {Function} Key mapper function.
    comparer -- {Function} [Optional] Comparer used to compare key values.

    Returns an observable {Observable} sequence containing a list of zero
    or more elements that have a maximum key value.
--------------------------------------------------------------------------------

   1.5     M New subscription on stream 276577929
   2.8     M [next]    1.1: ['1', '2', '1', '2', '4']
   3.3     M [cmpl]    1.7: fin

... and emit the item with the minimum value min, min_by


In [99]:
rst(O.min)
d = subs(O.from_('1199').min(lambda x, y: int(x) + int(y) > 5))



========== min ==========

module rx.linq.observable.min
@extensionmethod(ObservableBase)
def min(self, comparer=None):
    Returns the minimum element in an observable sequence according to
    the optional comparer else a default greater than less than check.

    Example
    res = source.min()
    res = source.min(lambda x, y: x.value - y.value)

    comparer -- {Function} [Optional] Comparer used to compare elements.

    Returns an observable sequence {Observable} containing a single element
    with the minimum element in the source sequence.
--------------------------------------------------------------------------------

   1.5     M New subscription on stream 279257813
   2.7     M [next]    1.0: 1
   2.8     M [cmpl]    1.2: fin

In [100]:
rst(O.min_by)
d = subs(O.from_('1271246').min_by(lambda x: int(x)  < 5))



========== min_by ==========

module rx.linq.observable.minby
@extensionmethod(ObservableBase)
def min_by(self, key_mapper, comparer=None):
    Returns the elements in an observable sequence with the minimum key
    value according to the specified comparer.

    Example
    res = source.min_by(lambda x: x.value)
    res = source.min_by(lambda x: x.value, lambda x, y: x - y)

    Keyword arguments:
    key_mapper -- {Function} Key mapper function.
    comparer -- {Function} [Optional] Comparer used to compare key values.

    Returns an observable {Observable} sequence containing a list of zero
    or more elements that have a minimum key value.
--------------------------------------------------------------------------------

   1.7     M New subscription on stream 276595497
   3.1     M [next]    1.3: ['7', '6']
   3.3     M [cmpl]    1.6: fin

... by applying an aggregation function to each item in turn (or recursing into its result) and emitting the result scan, expand


In [110]:
rst(O.scan)
# printing original value next to the aggregate:
d = subs(O.from_('12345').scan(
        lambda x, y: [y, int(x[1]) + int(y)],
        seed=[0, 0]))



========== scan ==========

module rx.linq.observable.scan
@extensionmethod(ObservableBase)
def scan(self, accumulator, seed=None):
    Applies an accumulator function over an observable sequence and
    returns each intermediate result. The optional seed value is used as
    the initial accumulator value. For aggregation behavior with no
    intermediate results, see Observable.aggregate.

    1 - scanned = source.scan(lambda acc, x: acc + x)
    2 - scanned = source.scan(lambda acc, x: acc + x, 0)

    Keyword arguments:
    accumulator -- An accumulator function to be invoked on each element.
    seed -- [Optional] The initial accumulator value.

    Returns an observable sequence containing the accumulated values.
--------------------------------------------------------------------------------

   2.4     M New subscription on stream 278466789
   3.0     M [next]    0.4: ['1', 1]
   3.2     M [next]    0.7: ['2', 3]
   3.5     M [next]    0.9: ['3', 6]
   3.7     M [next]    1.2: ['4', 10]
   4.0     M [next]    1.4: ['5', 15]
   4.2     M [cmpl]    1.7: fin

In [115]:
rst(O.expand)
# printing original value next to the aggregate:
d = subs(O.just(42).expand(lambda x: O.just(42 + x)).take(5))



========== expand ==========

module rx.linq.observable.expand
@extensionmethod(ObservableBase)
def expand(self, mapper, scheduler=None):
    Expands an observable sequence by recursively invoking mapper.

    mapper -- {Function} Selector function to invoke for each produced
        element, resulting in another sequence to which the mapper will be
        invoked recursively again.
    scheduler -- {Scheduler} [Optional] Scheduler on which to perform the
        expansion. If not provided, this defaults to the current thread
        scheduler.

    Returns an observable {Observable} sequence containing all the elements
    produced by the recursive expansion.
--------------------------------------------------------------------------------

   4.0     M New subscription on stream 278470673
   4.6     M [next]    0.5: 42
   5.3     M [next]    1.2: 84
   5.7     M [next]    1.6: 126
   6.3     M [next]    2.1: 168
   6.5     M [next]    2.4: 210
   6.6     M [cmpl]    2.5: fin

In [ ]: